<?php

namespace App\Process;

use App\Model\FormIds;
use App\Model\UsersModel;
use App\Queue\Queue;
use Carbon\Carbon;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\Component\Process\AbstractProcess;

class MessageCreate extends AbstractProcess
{
    private $isRun = false;
    public function run($arg){
        //定时500ms检测有没有任务，有的话就while死循环执行
        $this->addTick(500,function (){
            if(!$this->isRun){
                $this->isRun = true;
                $queue = new Queue();
                go(function () use ($queue){
                    while (true){
                        try{
                            $task = $queue->onQueue('message-create')->read();
                            if($task){
                                $model = new UsersModel();
                                $model->update(['id'=>1,'created_at'=>Carbon::now()]);
                                $model = new FormIds();
                                $lists = $model->select('DISTINCT(`openid`)')->get();
                                foreach ($lists as $item){
                                    $queue->onQueue('message-push')->push($item['openid']);
                                }
                            }else{
                                break;
                            }
                            unset($task);
                        }catch (\Throwable $throwable){
                            break;
                        }
                    }
                    $this->isRun = false;
                });
                unset($queue);
            }
        });
    }

    public function onShutDown()
    {
        // TODO: Implement onShutDown() method.
    }

    public function onReceive(string $str, ...$args)
    {
        // TODO: Implement onReceive() method.
    }
}
